#include <srs_protocol_rtmp.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_lib_simple_socket.hpp>
#include <srs_core_autofree.hpp>
#include <srs_librtmp.hpp>
#include <process.h>

#define InitSockets()	{\
	WORD version;			\
	WSADATA wsaData;		\
	\
	version = MAKEWORD(1,1);	\
	WSAStartup(version, &wsaData);	}

#define	CleanupSockets()	WSACleanup()

int fmle_publish(SrsRtmpServer* rtmp, SrsRequest* req)
{
	int ret = ERROR_SUCCESS;

	bool hasHead = false;
	char head[40] = {0};
	char commonHead[4] = {0x00, 0x00, 0x00, 0x01};

	FILE *fp = fopen("video.x264", "wb");
	while (true) {
		Sleep(0);

		SrsMessage* msg = NULL;
		if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
			srs_error("fmle recv identify client message failed. ret=%d", ret);
			return ret;
		}else
			printf("fmle recv identify client message ret=%d\n", msg->size);

		SrsAutoFree(SrsMessage, msg);

		if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
			SrsPacket* pkt = NULL;
			if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
				srs_error("fmle decode unpublish message failed. ret=%d", ret);
				return ret;
			}

			SrsAutoFree(SrsPacket, pkt);

			if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
				SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
				if ((ret = rtmp->fmle_unpublish(10, unpublish->transaction_id)) != ERROR_SUCCESS) {
					return ret;
				}
				return ERROR_CONTROL_REPUBLISH;
			}

			srs_trace("fmle ignore AMF0/AMF3 command message.");
			continue;
		}

		if (msg->header.is_audio()) {
			//printf("audio\n");
		}
		if (msg->header.is_video()) {
			printf("video\n");
			if (!hasHead)
			{
				hasHead = true;
				memcpy(head, commonHead, 4);
				memcpy(head + 4, msg->payload + 13, 28);
				memcpy(head + 4 + 28, commonHead, 4);
				memcpy(head + 36, msg->payload + 13 + 28 + 3, 4);
				fwrite(head, 1, 40, fp);
				fflush(fp);
			}else
			{
				fwrite(commonHead + 1, 1, 3, fp);
				fwrite(msg->payload+9, 1, msg->size - 9, fp);
				fflush(fp);
			}
			//fwrite(msg->payload, 1, msg->size, fp);
			//fflush(fp);
		}
		if (msg->header.is_aggregate()) {
			//printf("aggregate\n");
		}
		if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
			//printf("data\n");
		}
	}

	return ret;
}

void server_thread(void *arg)
{
	SimpleSocketStream* server = (SimpleSocketStream*)arg;
	SrsRtmpServer* rtmp = new SrsRtmpServer(server);
	SrsRequest* req = new SrsRequest();

	int ret = ERROR_SUCCESS;
	
	if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
		return;
	}

	SrsMessage* msg = NULL;
	SrsConnectAppPacket* pkt = NULL;
	if ((ret = srs_rtmp_expect_message<SrsConnectAppPacket>(rtmp->get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) {
		return ;
	}
	if ((ret = rtmp->response_connect_app(req, 0)) != ERROR_SUCCESS) {
		return;
	}
	while (true) {
		SrsRtmpConnType type;
		if ((ret = rtmp->identify_client(10, type, req->stream, req->duration)) != ERROR_SUCCESS){
			delete req;
			delete rtmp;
			delete server;
			return;
		}
		assert(type == SrsRtmpConnFMLEPublish);
		req->strip();
		rtmp->start_fmle_publish(10);
		int ret = fmle_publish(rtmp, req);
		if ((ret != ERROR_CONTROL_REPUBLISH) && (ret != ERROR_CONTROL_RTMP_CLOSE)) {
			break;
		}
	}
}

int test()
{
	InitSockets();
	SimpleSocketStream socket;
	socket.create_socket();
	socket.listen("127.0.0.1", 1935, 10);

	SimpleSocketStream* server = new SimpleSocketStream;
	while (true)
	{
		if (0 == socket.accept(*server))
		{
			_beginthread(server_thread, 0, server);
			server = new SimpleSocketStream;
		}
		else
		{
			Sleep(10);
		}
	} 
	CleanupSockets();
}
